/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

package org.unidata.mdm.search.service.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.apache.commons.collections4.CollectionUtils;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.search.configuration.SearchConfigurationConstants;
import org.unidata.mdm.search.context.ComplexSearchRequestContext;
import org.unidata.mdm.search.context.SearchRequestContext;
import org.unidata.mdm.search.exception.SearchExceptionIds;
import org.unidata.mdm.search.type.IndexField;
import org.unidata.mdm.search.type.IndexType;
import org.unidata.mdm.search.util.SearchUtils;
import org.unidata.mdm.system.exception.PlatformFailureException;
import org.unidata.mdm.system.type.annotation.ConfigurationRef;
import org.unidata.mdm.system.type.configuration.ConfigurationValue;

/**
 * @author Mikhail Mikhailov
 */
@Component
public class AdminComponentImpl extends BaseComponentImpl /*, ConfigurationUpdatesConsumer */ {
    /**
     * Logger.
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(AdminComponentImpl.class);

    private static final boolean DEFAULT_INDEX_RELATIONS_STRAIGHT = true;

    private static final Integer DELETE_QUERY_SIZE = 10000;

    /**
     * Transport client to use.
     */
    @Autowired
    private Client client;

    /**
     * Admin action timeout.
     */
    @ConfigurationRef(SearchConfigurationConstants.PROPERTY_ADMIN_ACTION_TIMEOUT)
    private ConfigurationValue<Long> adminActionTimeout;

    /**
     * Empty args constructor, used by container.
     */
    public AdminComponentImpl() {
        super();
    }

    /**
     * One arg constructor, used by utility.
     */
    public AdminComponentImpl(Client client) {
        super();
        this.client = client;
    }

    /**
     * Sets several system fields at once.
     *
     * @param ctx the context
     * @param fields fields to set
     * @return true, if successful, false otherwise
     */
    public boolean update(final SearchRequestContext ctx, Map<? extends IndexField, Object> fields, boolean refreshImmediate) {

        // 1. Compose the name of the type
        final String indexName = constructIndexName(ctx);
        final QueryBuilder qb = createGeneralQueryFromContext(ctx, ctx.getNestedSearch());
        try (XContentBuilder builder = XContentFactory.jsonBuilder()) {

            // 2. Set marks
            builder.startObject();
            for (Entry<? extends IndexField, Object> entry : fields.entrySet()) {
                builder.field(entry.getKey().getName(), entry.getValue());
            }
            builder.endObject();

            return updateInternal(qb, builder, ctx.getType(), refreshImmediate, indexName);
        } catch (IOException e) {
            final String message = "XContentBuilder threw an exception. {}.";
            throw new PlatformFailureException(message, e, SearchExceptionIds.EX_SEARCH_MARK_DOCUMENT_FAILED);
        }
    }

    /**
     * Sets several system fields at once.
     *
     * @param ctx the context
     * @param fields fields to set
     * @return true, if successful, false otherwise
     */
    public boolean update(final ComplexSearchRequestContext ctx, Map<? extends IndexField, Object> fields, boolean refreshImmediate) {

        // 1. Compose the name of the type
        Map<SearchRequestContext, QueryBuilder> queries = createQueryForComplex(ctx);
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
        queries.values().forEach(boolQuery::should);
        String[] indexNames = ctx.getAllContexts()
                .stream()
                .map(this::constructIndexName)
                .distinct()
                .toArray(String[]::new);

        try (XContentBuilder builder = XContentFactory.jsonBuilder()) {

            // 1. Set marks
            builder.startObject();
            for (Entry<? extends IndexField, Object> entry : fields.entrySet()) {
                builder.field(entry.getKey().getName(), entry.getValue());
            }
            builder.endObject();
            return updateInternal(boolQuery, builder, null, refreshImmediate, indexNames);
        } catch (IOException e) {
            final String message = "XContentBuilder threw an exception. {}.";
            throw new PlatformFailureException(message, e, SearchExceptionIds.EX_SEARCH_MARK_DOCUMENT_FAILED);
        }
    }

    /**
     * @param query - query
     * @param update - updates
     * @param searchType - can be null, if defined search will be more strict
     * @param indexNames - index names
     * @return true if successful, false otherwise
     */
    private boolean updateInternal(@Nonnull QueryBuilder query, @Nonnull XContentBuilder update,
                                   @Nullable IndexType searchType, boolean refreshImmediate, final String... indexNames) {

        SearchRequestBuilder srb = client.prepareSearch(indexNames)
                .setFetchSource(false)
                .setQuery(query)
                .setSize(5000)
                .setPostFilter(createPostFilterQuery(searchType, null));

        SearchResponse idsResponse = executeRequest(srb);

        SearchHit[] hits = idsResponse.getHits().getHits();
        if (hits.length > 0) {

            final BulkRequestBuilder bulkRequest = client.prepareBulk()
                    .setRefreshPolicy(refreshImmediate
                            ? WriteRequest.RefreshPolicy.IMMEDIATE
                            : WriteRequest.RefreshPolicy.NONE);

            for (SearchHit hit : hits) {

                UpdateRequestBuilder requestBuilder = client.prepareUpdate()
                        .setFetchSource(false)
                        .setId(hit.getId())
                        .setIndex(hit.getIndex())
                        .setDoc(update);

                bulkRequest.add(requestBuilder);
            }

            final BulkResponse bulkResponse = executeRequest(bulkRequest);
            if (bulkResponse.hasFailures()) {

                final String message = bulkResponse.buildFailureMessage();
                LOGGER.error("Failure while updating index: {}.", message);
            }

            return !bulkResponse.hasFailures();
        }

        return true;
    }

    /**
     * Deletes documents for request.
     *
     * @param ctx - context
     * @return true if successful, false otherwise
     */
    public boolean delete(final SearchRequestContext ctx, boolean refreshImmediate) {
        final String indexName = constructIndexName(ctx);
        QueryBuilder queryBuilder = createGeneralQueryFromContext(ctx, ctx.getNestedSearch());

        return deleteInternal(queryBuilder, ctx.getType(), refreshImmediate, indexName);
    }

    /**
     * Deletes documents for request.
     *
     * @param context - context
     * @return true if successful, false otherwise
     */
    public boolean delete(final ComplexSearchRequestContext context, boolean refreshImmediate) {
        Map<SearchRequestContext, QueryBuilder> queries = createQueryForComplex(context);
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
        queries.values().forEach(boolQuery::should);
        String[] indexNames = context.getAllContexts()
                .stream()
                .map(this::constructIndexName)
                .distinct()
                .toArray(String[]::new);

        return deleteInternal(boolQuery, null, refreshImmediate, indexNames);
    }

    /**
     * @param ctx the context
     * @return true if successful, false otherwise
     */
    public boolean deleteAll(final SearchRequestContext ctx, boolean refresh) {

        final String indexName = constructIndexName(ctx);
        QueryBuilder queryBuilder = createGeneralQueryFromContext(ctx, ctx.getNestedSearch());
        return new DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
                    .filter(queryBuilder)
                    .source(indexName)
                    .refresh(refresh)
                    .get().getBulkFailures().isEmpty();
    }

    /**
     * General method for removing something from ES
     *
     * @param qb the query builder
     * @param searchType type of search type
     * @param indexNames - collection of index names.
     * @return true if successful, false otherwise
     */
    private boolean deleteInternal(@Nonnull QueryBuilder qb,
                                   @Nullable IndexType searchType, boolean refreshImmediate, final String... indexNames) {

        Collection<DeleteRequestBuilder> deletes = processDelete(qb, searchType, indexNames);
        if (CollectionUtils.isEmpty(deletes)) {
            return true;
        }

        final BulkRequestBuilder bulkRequest = client.prepareBulk().setRefreshPolicy(refreshImmediate
                ? WriteRequest.RefreshPolicy.IMMEDIATE
                : WriteRequest.RefreshPolicy.NONE);

        for (DeleteRequestBuilder drb : deletes) {
            bulkRequest.add(drb);
        }

        if (refreshImmediate) {

            final BulkResponse bulkResponse = bulkRequest.execute().actionGet();
            boolean hasFailures = bulkResponse.hasFailures();
            if (hasFailures) {
                final String message = bulkResponse.buildFailureMessage();
                LOGGER.error("Error during delete {}.", message);
            }

            return !hasFailures;
        } else {
            bulkRequest.execute();
            return true;
        }
    }

    /**
     * Collects delete objects.
     *
     * @param qb query
     * @param searchType search type
     * @param indexNames index names
     * @return collection
     */
    private Collection<DeleteRequestBuilder> processDelete(@Nonnull QueryBuilder qb, @Nullable IndexType searchType, final String... indexNames) {

        SearchRequestBuilder srb = client.prepareSearch(indexNames)
                .setFetchSource(false)
                .setQuery(qb)
                .addSort(SearchUtils.ID_FIELD, SortOrder.ASC)
                .setPostFilter(createPostFilterQuery(searchType, null))
                .setSize(DELETE_QUERY_SIZE);

        List<DeleteRequestBuilder> result = new ArrayList<>();
        while (true) {

            SearchResponse idsResponse = executeRequest(srb);
            SearchHit[] hits = idsResponse.getHits().getHits();

            for (SearchHit hit : hits) {

                DocumentField routing = hit.getFields().get("_routing");
                result.add(client.prepareDelete()
                        .setId(hit.getId())
                        .setIndex(hit.getIndex())
                        .setRouting(Objects.nonNull(routing) ? routing.getValue() : null));
            }

            // After deleting, we should check for more records
            if (hits.length < DELETE_QUERY_SIZE) {
                break;
            } else {
                srb = client.prepareSearch(indexNames)
                        .setFetchSource(false)
                        .setQuery(qb)
                        .addSort(SearchUtils.ID_FIELD, SortOrder.ASC)
                        .setPostFilter(createPostFilterQuery(searchType, null))
                        .searchAfter(hits[hits.length - 1].getSortValues())
                        .setSize(DELETE_QUERY_SIZE);
            }
        }

        return result;
    }

    /**
     * {@inheritDoc}
     */
    public boolean setClusterSettings(Map<String, Object> settings, boolean persistent) {

        // 1. Set settings
        ClusterUpdateSettingsRequestBuilder b = client.admin()
                .cluster()
                .prepareUpdateSettings();

        if (persistent) {
            b.setPersistentSettings(settings);
        } else {
            b.setTransientSettings(settings);
        }

        // 2. Execute
        return b.execute()
                .actionGet(adminActionTimeout.getValue())
                .isAcknowledged();
    }
}
